package cn.yh.kafkaDemo.producer;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {
	private final Producer<String, String> producer;
	public final static String TOPIC = "track-v1-local";

	private KafkaProducer() {
		Properties props = new Properties();
		// 此处配置的是kafka的端口
		props.put("metadata.broker.list", "172.31.1.41:9092,172.31.1.42:9092,172.31.1.43:9092");
		//props.put("zk.connect", "localhost:2181");

		// 配置value的序列化类
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		// 配置key的序列化类
		props.put("key.serializer.class", "kafka.serializer.StringEncoder");

		props.put("request.required.acks", "-1");

		producer = new Producer<String, String>(new ProducerConfig(props));
	}

	void produce() {
		int messageNo = 1000;
		final int COUNT = 10000;

		while (messageNo < COUNT) {
			String key = String.valueOf(messageNo);
			String data = "{version:"+"hello 生产者" + key+"}";
			producer.send(new KeyedMessage<String, String>(TOPIC, key, data));
			System.out.println(data);
			messageNo++;
		}
	}

	public static void main(String[] args) {
		new KafkaProducer().produce();
	}
}
